# Copyright (c) Meta Platforms, Inc. and affiliates.
# SPDX-License-Identifier: LGPL-2.1-or-later

"""
Memory Management
-----------------

The ``drgn.helpers.linux.mm`` module provides helpers for working with the
Linux memory management (MM) subsystem.

.. _virtual address translation failures:

Helpers that translate virtual addresses or read virtual memory may fail for
multiple reasons:

1. If the address is invalid.
2. If the address is swapped or paged out.
3. If the address is in `high memory
   <https://docs.kernel.org/mm/highmem.html>`_. High memory is only used for
   userspace memory by 32-bit systems with a lot of physical memory, and only
   if ``CONFIG_HIGHMEM`` is enabled.

   3a. If the page table is in high memory. This is only possible if
   ``CONFIG_HIGHPTE`` is enabled.
"""

import operator
import os
import re
from typing import Callable, Iterable, Iterator, List, NamedTuple, Optional, Tuple

from _drgn import (
    _linux_helper_direct_mapping_offset,
    _linux_helper_follow_phys,
    _linux_helper_read_vm,
)
from drgn import (
    NULL,
    Architecture,
    IntegerLike,
    Object,
    ObjectAbsentError,
    ObjectNotFoundError,
    Program,
    TypeKind,
    cast,
    container_of,
)
from drgn.helpers.common.format import decode_enum_type_flags
from drgn.helpers.common.prog import takes_program_or_default
from drgn.helpers.linux.bitops import for_each_set_bit
from drgn.helpers.linux.device import bus_for_each_dev
from drgn.helpers.linux.fs import d_path
from drgn.helpers.linux.list import list_for_each_entry
from drgn.helpers.linux.mapletree import mt_for_each, mtree_load
from drgn.helpers.linux.mmzone import _highest_present_section_nr, _section_flags
from drgn.helpers.linux.percpu import percpu_counter_sum, percpu_counter_sum_positive
from drgn.helpers.linux.pid import for_each_task_in_group
from drgn.helpers.linux.rbtree import rb_find

__all__ = (
    "PFN_PHYS",
    "PHYS_PFN",
    "PageCompound",
    "PageHead",
    "PageSlab",
    "PageTail",
    "access_process_vm",
    "access_remote_vm",
    "cmdline",
    "compound_head",
    "compound_nr",
    "compound_order",
    "decode_memory_block_state",
    "decode_page_flags",
    "decode_page_flags_value",
    "environ",
    "find_vmap_area",
    "follow_page",
    "follow_pfn",
    "follow_phys",
    "for_each_memory_block",
    "for_each_page",
    "for_each_valid_page_range",
    "for_each_valid_pfn_and_page",
    "for_each_vma",
    "for_each_vmap_area",
    "in_direct_map",
    "memory_block_size_bytes",
    "page_flags",
    "page_index",
    "page_size",
    "page_to_pfn",
    "page_to_phys",
    "page_to_virt",
    "pfn_to_page",
    "pfn_to_virt",
    "phys_to_page",
    "phys_to_virt",
    "task_rss",
    "totalram_pages",
    "virt_to_page",
    "virt_to_pfn",
    "virt_to_phys",
    "vm_commit_limit",
    "vm_memory_committed",
    "vma_find",
    "vma_name",
    "vmalloc_to_page",
    "vmalloc_to_pfn",
    # Generated by scripts/generate_page_flag_getters.py.
    "PageActive",
    "PageChecked",
    "PageDirty",
    "PageDoubleMap",
    "PageError",
    "PageForeign",
    "PageHWPoison",
    "PageHasHWPoisoned",
    "PageIdle",
    "PageIsolated",
    "PageLRU",
    "PageLocked",
    "PageMappedToDisk",
    "PageMlocked",
    "PageOwnerPriv1",
    "PagePinned",
    "PagePrivate",
    "PagePrivate2",
    "PageReadahead",
    "PageReclaim",
    "PageReferenced",
    "PageReported",
    "PageReserved",
    "PageSavePinned",
    "PageSkipKASanPoison",
    "PageSlobFree",
    "PageSwapBacked",
    "PageUncached",
    "PageUnevictable",
    "PageUptodate",
    "PageVmemmapSelfHosted",
    "PageWaiters",
    "PageWorkingset",
    "PageWriteback",
    "PageXenRemapped",
    "PageYoung",
)


def page_flags(page: Object) -> Object:
    """
    Return a page's flags.

    This handles kernel versions before and after the page flags were moved to
    `memdesc_flags_t`.

    :param page: ``struct page *``
    :return: ``unsigned long``
    """
    # Since Linux kernel commit 53fbef56e07d ("mm: introduce memdesc_flags_t")
    # (in v6.18), struct page->flags is no longer an unsigned long, but a
    # memdesc_flags_t containing the actual flags in '.f'.
    try:
        return page.flags.f
    except AttributeError:
        return page.flags


def page_index(page: Object) -> Object:
    """
    Return a page's offset (in pages) within its mapping.

    :param page: ``struct page *``
    :return: ``pgoff_t``
    """
    # The member was renamed in acc53a0b4c15 ("mm: rename page->index to
    # page->__folio_index") (in v6.16).
    try:
        return page.__folio_index
    except AttributeError:
        return page.index


def PageActive(page: Object) -> bool:
    """
    Return whether the ``PG_active`` flag is set on a page.

    :param page: ``struct page *``
    """
    try:
        flag = page.prog_["PG_active"]
    except KeyError:
        return False
    return bool(page_flags(page) & (1 << flag))


def PageChecked(page: Object) -> bool:
    """
    Return whether the ``PG_checked`` flag is set on a page.

    :param page: ``struct page *``
    """
    try:
        flag = page.prog_["PG_checked"]
    except KeyError:
        return False
    return bool(page_flags(page) & (1 << flag))


def PageDirty(page: Object) -> bool:
    """
    Return whether the ``PG_dirty`` flag is set on a page.

    :param page: ``struct page *``
    """
    try:
        flag = page.prog_["PG_dirty"]
    except KeyError:
        return False
    return bool(page_flags(page) & (1 << flag))


def PageDoubleMap(page: Object) -> bool:
    """
    Return whether the ``PG_double_map`` flag is set on a page.

    :param page: ``struct page *``
    """
    try:
        flag = page.prog_["PG_double_map"]
    except KeyError:
        return False
    return bool(page_flags(page) & (1 << flag))


def PageError(page: Object) -> bool:
    """
    Return whether the ``PG_error`` flag is set on a page.

    :param page: ``struct page *``
    """
    try:
        flag = page.prog_["PG_error"]
    except KeyError:
        return False
    return bool(page_flags(page) & (1 << flag))


def PageForeign(page: Object) -> bool:
    """
    Return whether the ``PG_foreign`` flag is set on a page.

    :param page: ``struct page *``
    """
    try:
        flag = page.prog_["PG_foreign"]
    except KeyError:
        return False
    return bool(page_flags(page) & (1 << flag))


def PageHWPoison(page: Object) -> bool:
    """
    Return whether the ``PG_hwpoison`` flag is set on a page.

    :param page: ``struct page *``
    """
    try:
        flag = page.prog_["PG_hwpoison"]
    except KeyError:
        return False
    return bool(page_flags(page) & (1 << flag))


def PageHasHWPoisoned(page: Object) -> bool:
    """
    Return whether the ``PG_has_hwpoisoned`` flag is set on a page.

    :param page: ``struct page *``
    """
    try:
        flag = page.prog_["PG_has_hwpoisoned"]
    except KeyError:
        return False
    return bool(page_flags(page) & (1 << flag))


def PageIdle(page: Object) -> bool:
    """
    Return whether the ``PG_idle`` flag is set on a page.

    :param page: ``struct page *``
    """
    try:
        flag = page.prog_["PG_idle"]
    except KeyError:
        return False
    return bool(page_flags(page) & (1 << flag))


def PageIsolated(page: Object) -> bool:
    """
    Return whether the ``PG_isolated`` flag is set on a page.

    :param page: ``struct page *``
    """
    try:
        flag = page.prog_["PG_isolated"]
    except KeyError:
        return False
    return bool(page_flags(page) & (1 << flag))


def PageLRU(page: Object) -> bool:
    """
    Return whether the ``PG_lru`` flag is set on a page.

    :param page: ``struct page *``
    """
    try:
        flag = page.prog_["PG_lru"]
    except KeyError:
        return False
    return bool(page_flags(page) & (1 << flag))


def PageLocked(page: Object) -> bool:
    """
    Return whether the ``PG_locked`` flag is set on a page.

    :param page: ``struct page *``
    """
    try:
        flag = page.prog_["PG_locked"]
    except KeyError:
        return False
    return bool(page_flags(page) & (1 << flag))


def PageMappedToDisk(page: Object) -> bool:
    """
    Return whether the ``PG_mappedtodisk`` flag is set on a page.

    :param page: ``struct page *``
    """
    try:
        flag = page.prog_["PG_mappedtodisk"]
    except KeyError:
        return False
    return bool(page_flags(page) & (1 << flag))


def PageMlocked(page: Object) -> bool:
    """
    Return whether the ``PG_mlocked`` flag is set on a page.

    :param page: ``struct page *``
    """
    try:
        flag = page.prog_["PG_mlocked"]
    except KeyError:
        return False
    return bool(page_flags(page) & (1 << flag))


def PageOwnerPriv1(page: Object) -> bool:
    """
    Return whether the ``PG_owner_priv_1`` flag is set on a page.

    :param page: ``struct page *``
    """
    try:
        flag = page.prog_["PG_owner_priv_1"]
    except KeyError:
        return False
    return bool(page_flags(page) & (1 << flag))


def PagePinned(page: Object) -> bool:
    """
    Return whether the ``PG_pinned`` flag is set on a page.

    :param page: ``struct page *``
    """
    try:
        flag = page.prog_["PG_pinned"]
    except KeyError:
        return False
    return bool(page_flags(page) & (1 << flag))


def PagePrivate(page: Object) -> bool:
    """
    Return whether the ``PG_private`` flag is set on a page.

    :param page: ``struct page *``
    """
    try:
        flag = page.prog_["PG_private"]
    except KeyError:
        return False
    return bool(page_flags(page) & (1 << flag))


def PagePrivate2(page: Object) -> bool:
    """
    Return whether the ``PG_private_2`` flag is set on a page.

    :param page: ``struct page *``
    """
    try:
        flag = page.prog_["PG_private_2"]
    except KeyError:
        return False
    return bool(page_flags(page) & (1 << flag))


def PageReadahead(page: Object) -> bool:
    """
    Return whether the ``PG_readahead`` flag is set on a page.

    :param page: ``struct page *``
    """
    try:
        flag = page.prog_["PG_readahead"]
    except KeyError:
        return False
    return bool(page_flags(page) & (1 << flag))


def PageReclaim(page: Object) -> bool:
    """
    Return whether the ``PG_reclaim`` flag is set on a page.

    :param page: ``struct page *``
    """
    try:
        flag = page.prog_["PG_reclaim"]
    except KeyError:
        return False
    return bool(page_flags(page) & (1 << flag))


def PageReferenced(page: Object) -> bool:
    """
    Return whether the ``PG_referenced`` flag is set on a page.

    :param page: ``struct page *``
    """
    try:
        flag = page.prog_["PG_referenced"]
    except KeyError:
        return False
    return bool(page_flags(page) & (1 << flag))


def PageReported(page: Object) -> bool:
    """
    Return whether the ``PG_reported`` flag is set on a page.

    :param page: ``struct page *``
    """
    try:
        flag = page.prog_["PG_reported"]
    except KeyError:
        return False
    return bool(page_flags(page) & (1 << flag))


def PageReserved(page: Object) -> bool:
    """
    Return whether the ``PG_reserved`` flag is set on a page.

    :param page: ``struct page *``
    """
    try:
        flag = page.prog_["PG_reserved"]
    except KeyError:
        return False
    return bool(page_flags(page) & (1 << flag))


def PageSavePinned(page: Object) -> bool:
    """
    Return whether the ``PG_savepinned`` flag is set on a page.

    :param page: ``struct page *``
    """
    try:
        flag = page.prog_["PG_savepinned"]
    except KeyError:
        return False
    return bool(page_flags(page) & (1 << flag))


def PageSkipKASanPoison(page: Object) -> bool:
    """
    Return whether the ``PG_skip_kasan_poison`` flag is set on a page.

    :param page: ``struct page *``
    """
    try:
        flag = page.prog_["PG_skip_kasan_poison"]
    except KeyError:
        return False
    return bool(page_flags(page) & (1 << flag))


def PageSlobFree(page: Object) -> bool:
    """
    Return whether the ``PG_slob_free`` flag is set on a page.

    :param page: ``struct page *``
    """
    try:
        flag = page.prog_["PG_slob_free"]
    except KeyError:
        return False
    return bool(page_flags(page) & (1 << flag))


def PageSwapBacked(page: Object) -> bool:
    """
    Return whether the ``PG_swapbacked`` flag is set on a page.

    :param page: ``struct page *``
    """
    try:
        flag = page.prog_["PG_swapbacked"]
    except KeyError:
        return False
    return bool(page_flags(page) & (1 << flag))


def PageUncached(page: Object) -> bool:
    """
    Return whether the ``PG_uncached`` flag is set on a page.

    :param page: ``struct page *``
    """
    try:
        flag = page.prog_["PG_uncached"]
    except KeyError:
        return False
    return bool(page_flags(page) & (1 << flag))


def PageUnevictable(page: Object) -> bool:
    """
    Return whether the ``PG_unevictable`` flag is set on a page.

    :param page: ``struct page *``
    """
    try:
        flag = page.prog_["PG_unevictable"]
    except KeyError:
        return False
    return bool(page_flags(page) & (1 << flag))


def PageUptodate(page: Object) -> bool:
    """
    Return whether the ``PG_uptodate`` flag is set on a page.

    :param page: ``struct page *``
    """
    try:
        flag = page.prog_["PG_uptodate"]
    except KeyError:
        return False
    return bool(page_flags(page) & (1 << flag))


def PageVmemmapSelfHosted(page: Object) -> bool:
    """
    Return whether the ``PG_vmemmap_self_hosted`` flag is set on a page.

    :param page: ``struct page *``
    """
    try:
        flag = page.prog_["PG_vmemmap_self_hosted"]
    except KeyError:
        return False
    return bool(page_flags(page) & (1 << flag))


def PageWaiters(page: Object) -> bool:
    """
    Return whether the ``PG_waiters`` flag is set on a page.

    :param page: ``struct page *``
    """
    try:
        flag = page.prog_["PG_waiters"]
    except KeyError:
        return False
    return bool(page_flags(page) & (1 << flag))


def PageWorkingset(page: Object) -> bool:
    """
    Return whether the ``PG_workingset`` flag is set on a page.

    :param page: ``struct page *``
    """
    try:
        flag = page.prog_["PG_workingset"]
    except KeyError:
        return False
    return bool(page_flags(page) & (1 << flag))


def PageWriteback(page: Object) -> bool:
    """
    Return whether the ``PG_writeback`` flag is set on a page.

    :param page: ``struct page *``
    """
    try:
        flag = page.prog_["PG_writeback"]
    except KeyError:
        return False
    return bool(page_flags(page) & (1 << flag))


def PageXenRemapped(page: Object) -> bool:
    """
    Return whether the ``PG_xen_remapped`` flag is set on a page.

    :param page: ``struct page *``
    """
    try:
        flag = page.prog_["PG_xen_remapped"]
    except KeyError:
        return False
    return bool(page_flags(page) & (1 << flag))


def PageYoung(page: Object) -> bool:
    """
    Return whether the ``PG_young`` flag is set on a page.

    :param page: ``struct page *``
    """
    try:
        flag = page.prog_["PG_young"]
    except KeyError:
        return False
    return bool(page_flags(page) & (1 << flag))


# End generated by scripts/generate_page_flag_getters.py.


def _get_PageSlab_impl(prog: Program) -> Callable[[Object], bool]:
    # Since Linux kernel commit 46df8e73a4a3 ("mm: free up PG_slab") (in
    # v6.10), slab pages are identified by a page type, which is indicated by a
    # mapcount value matching a value in VMCOREINFO. Before that, they are
    # indicated by a page flag.
    try:
        return prog.cache["PageSlab"]
    except KeyError:
        pass
    vmcoreinfo = prog["VMCOREINFO"].string_()
    match = re.search(
        rb"^NUMBER\(PAGE_SLAB_MAPCOUNT_VALUE\)=(-?[0-9]+)$", vmcoreinfo, flags=re.M
    )
    if match:
        PAGE_SLAB_MAPCOUNT_VALUE = int(match.group(1))

        def PageSlab(page: Object) -> bool:
            return page._mapcount.counter.value_() == PAGE_SLAB_MAPCOUNT_VALUE

    else:
        mask = 1 << prog["PG_slab"]

        def PageSlab(page: Object) -> bool:
            return bool(page_flags(page) & mask)

    prog.cache["PageSlab"] = PageSlab
    return PageSlab


def PageSlab(page: Object) -> bool:
    """
    Return whether a page belongs to the slab allocator.

    :param page: ``struct page *``
    """
    return _get_PageSlab_impl(page.prog_)(page)


def PageCompound(page: Object) -> bool:
    """
    Return whether a page is part of a `compound page
    <https://lwn.net/Articles/619514/>`_.

    :param page: ``struct page *``
    """
    page = page.read_()
    # Since Linux kernel commit 1d798ca3f164 ("mm: make compound_head()
    # robust") (in v4.4), PG_head is always defined, and a tail page has the
    # least significant bit of compound_head set. Before that, there is no
    # compound_head (and no fake head pages). Instead, if
    # CONFIG_PAGEFLAGS_EXTENDED=y, then PG_head and PG_tail are defined.
    # Otherwise, there is only PG_compound, and PG_reclaim is set for tail
    # pages and clear for head pages.
    try:
        PG_head = page.prog_["PG_head"]
    except KeyError:
        return bool(page_flags(page) & (1 << page.prog_["PG_compound"]))
    else:
        flags = page_flags(page)
        if flags & (1 << PG_head):
            return True
        try:
            return bool(page.compound_head.read_() & 1)
        except AttributeError:
            return bool(flags & (1 << page.prog_["PG_tail"]))


# HugeTLB Vmemmap Optimization (HVO) creates "fake" head pages that are
# actually tail pages. See Linux kernel commit e7d324850bfc ("mm: hugetlb: free
# the 2nd vmemmap page associated with each HugeTLB page") (in v5.18) and
# https://www.kernel.org/doc/html/latest/mm/vmemmap_dedup.html.
def _page_is_fake_head(page: Object) -> bool:
    head = page[1].compound_head.value_()
    return bool(head & 1) and (head - 1) != page.value_()


def PageHead(page: Object) -> bool:
    """
    Return whether a page is a head page in a `compound page`_.

    :param page: ``struct page *``
    """
    page = page.read_()
    # See PageCompound() re: Linux kernel commit 1d798ca3f164 ("mm: make
    # compound_head() robust") (in v4.4).
    try:
        PG_head = page.prog_["PG_head"]
    except KeyError:
        PG_compound = page.prog_["PG_compound"]
        PG_head_mask = 1 << PG_compound
        PG_head_tail_mask = PG_head_mask | (1 << page.prog_["PG_reclaim"])
        return (page_flags(page) & PG_head_tail_mask) == PG_head_mask
    else:
        if not (page_flags(page) & (1 << PG_head)):
            return False
        try:
            return not _page_is_fake_head(page)
        except AttributeError:
            return True


def PageTail(page: Object) -> bool:
    """
    Return whether a page is a tail page in a `compound page`_.

    :param page: ``struct page *``
    """
    page = page.read_()
    # See PageCompound() re: Linux kernel commit 1d798ca3f164 ("mm: make
    # compound_head() robust") (in v4.4).
    try:
        if page.compound_head.value_() & 1:
            return True
    except AttributeError:
        try:
            PG_tail = page.prog_["PG_tail"]
        except KeyError:
            PG_head_tail_mask = (1 << page.prog_["PG_compound"]) | (
                1 << page.prog_["PG_reclaim"]
            )
            return (page_flags(page) & PG_head_tail_mask) == PG_head_tail_mask
        else:
            return bool(page_flags(page) & (1 << PG_tail))
    if page_flags(page) & (1 << page.prog_["PG_head"]):
        return _page_is_fake_head(page)
    return False


def compound_head(page: Object) -> Object:
    """
    Get the head page associated with a page.

    If *page* is a tail page, this returns the head page of the `compound
    page`_ it belongs to. Otherwise, it returns *page*.

    :param page: ``struct page *``
    :return: ``struct page *``
    """
    page = page.read_()
    try:
        head = page.compound_head.read_()
    except AttributeError:
        # Before Linux kernel commit 1d798ca3f164 ("mm: make compound_head()
        # robust") (in v4.4), the head page is in page->first_page, and there
        # are no fake head pages.
        return page.first_page.read_() if PageTail(page) else page
    if head & 1:
        return cast(page.type_, head - 1)
    # Handle fake head pages (see _page_is_fake_head()).
    if page_flags(page) & (1 << page.prog_["PG_head"]):
        head = page[1].compound_head.read_()
        if head & 1:
            return cast(page.type_, head - 1)
    return page


def compound_order(page: Object) -> Object:
    """
    Return the allocation order of a potentially `compound page`_.

    :param page: ``struct page *``
    :return: ``unsigned int``
    """
    prog = page.prog_

    if not PageHead(page):
        return Object(prog, "unsigned int", 0)

    # Since Linux kernel commit ebc1baf5c9b4 ("mm: free up a word in the first
    # tail page") (in v6.6), the compound order is in the low byte of struct
    # folio::_flags_1 (from_folio = 2). Between that and Linux kernel commit
    # Linux kernel commit 379708ffde1b ("mm: add the first tail page to struct
    # folio") (in v6.1), the compound order is in struct folio::_folio_order
    # (from_folio = 1). Before Linux kernel commit 1c5509be58f6 ("mm: remove
    # 'First tail page' members from struct page") (in v6.3), the compound
    # order is in struct page::compound_order of the first tail page
    # (from_folio = 0).
    try:
        from_folio = prog.cache["compound_order_from_folio"]
    except KeyError:
        from_folio = 0
        try:
            struct_folio = prog.type("struct folio")
        except LookupError:
            pass
        else:
            if struct_folio.has_member("_folio_order"):
                from_folio = 1
            elif struct_folio.has_member("_flags_1"):
                from_folio = 2
        prog.cache["compound_order_from_folio"] = from_folio
    if from_folio == 2:
        return cast("unsigned int", cast("struct folio *", page)._flags_1 & 0xFF)
    elif from_folio == 1:
        return cast("unsigned int", cast("struct folio *", page)._folio_order)
    else:
        return cast("unsigned int", page[1].compound_order)


def compound_nr(page: Object) -> Object:
    """
    Return the number of pages in a potentially `compound page`_.

    :param page: ``struct page *``
    :return: ``unsigned long``
    """
    return Object(page.prog_, "unsigned long", 1) << compound_order(page)


def page_size(page: Object) -> Object:
    """
    Return the number of bytes in a potentially `compound page`_.

    :param page: ``struct page *``
    :return: ``unsigned long``
    """
    return page.prog_["PAGE_SIZE"] << compound_order(page)


def decode_page_flags(page: Object) -> str:
    """
    Get a human-readable representation of the flags set on a page.

    >>> decode_page_flags(page)
    'PG_uptodate|PG_dirty|PG_lru|PG_reclaim|PG_swapbacked|PG_readahead|PG_savepinned|PG_isolated|PG_reported'

    :param page: ``struct page *``
    """
    return decode_page_flags_value(page_flags(page))


@takes_program_or_default
def decode_page_flags_value(prog: Program, flags: IntegerLike) -> str:
    """
    Get a human-readable representation of the flags value from a page.

    >>> flags = page_flags(page).read_()
    >>> hex(flags)
    0xfffffd0004028
    >>> decode_page_flags_value(flags)
    'PG_uptodate|PG_lru|PG_private|PG_reported'

    See also the :func:`decode_page_flags()` shortcut, which takes a
    ``struct page *`` instead.

    :param flags: ``unsigned long``
    """
    NR_PAGEFLAGS = prog["__NR_PAGEFLAGS"]
    PAGEFLAGS_MASK = (1 << NR_PAGEFLAGS.value_()) - 1
    return decode_enum_type_flags(
        operator.index(flags) & PAGEFLAGS_MASK, NR_PAGEFLAGS.type_
    )


# Get the struct page * for PFN 0.
def _page0(prog: Program) -> Object:
    try:
        return prog.cache["page0"]
    except KeyError:
        pass
    try:
        # With CONFIG_SPARSEMEM_VMEMMAP=y, page 0 is vmemmap.
        page0 = prog["vmemmap"]
    except KeyError:
        page0 = prog["mem_map"] - prog["ARCH_PFN_OFFSET"]
    # The struct page array is not contiguous for CONFIG_SPARSEMEM=y with
    # CONFIG_SPARSEMEM_VMEMMAP=n or CONFIG_DISCONTIGMEM=y, so those are not
    # supported yet.
    prog.cache["page0"] = page0
    return page0


@takes_program_or_default
def for_each_page(prog: Program) -> Iterator[Object]:
    """
    Iterate over every valid ``struct page *``.

    :return: Iterator of ``struct page *`` objects.
    """
    for start_pfn, end_pfn, mem_map in for_each_valid_page_range(prog):
        for pfn in range(start_pfn, end_pfn):
            yield mem_map + pfn


@takes_program_or_default
def for_each_valid_pfn_and_page(prog: Program) -> Iterator[Tuple[int, Object]]:
    """
    Iterate over every valid page frame number (PFN) and ``struct page *``.

    :return: Iterator of (``pfn``, ``struct page *``) tuples.
    """
    for start_pfn, end_pfn, mem_map in for_each_valid_page_range(prog):
        for pfn in range(start_pfn, end_pfn):
            yield pfn, mem_map + pfn


def _for_each_valid_page_range_flatmem(
    prog: Program,
) -> Iterator[Tuple[int, int, Object]]:
    mem_map = _page0(prog)

    if (
        prog.platform.arch  # type: ignore[union-attr]  # platform can't be None
        == Architecture.ARM
    ):
        # Since Linux kernel commit a4d5613c4dc6 ("arm: extend pfn_valid to
        # take into account freed memory map alignment") (in v5.14), Arm's
        # pfn_valid() checks that the PFN lies within a pageblock that
        # intersects a present memory chunk. However, pageblock_nr_pages is a
        # macro, and it's not easy to get its value. So, we get as close as we
        # can and ignore the extra pages granted by the pageblock alignment
        # (which is also what the kernel did before Linux kernel commit
        # 09414d00a137 ("ARM: only consider memblocks with NOMAP cleared for
        # linear mapping") (in v4.5)).
        page_shift = prog["PAGE_SHIFT"].value_()
        memory = prog["memblock"].memory
        prev_start = prev_end = None
        for region in memory.regions[: memory.cnt]:
            start = region.base.value_()
            end = (start + region.size.value_()) >> page_shift
            start >>= page_shift
            if start == prev_end:
                prev_end = end  # Merge adjacent regions.
            else:
                if prev_start is not None:
                    yield prev_start, prev_end, mem_map
                prev_start = start
                prev_end = end
        if prev_start is not None:
            yield prev_start, prev_end, mem_map  # type: ignore  # prev_end can't be None
        return

    # Generic FLATMEM validity.
    start_pfn = prog["ARCH_PFN_OFFSET"].value_()
    yield start_pfn, start_pfn + prog["max_mapnr"].value_(), mem_map


@takes_program_or_default
def for_each_valid_page_range(prog: Program) -> Iterator[Tuple[int, int, Object]]:
    """
    Iterate over every contiguous range of valid page frame numbers and
    ``struct page``\\ s.

    >>> for start_pfn, end_pfn, mem_map in for_each_valid_page():
    ...     pages = mem_map[start_pfn:end_pfn]

    :return: Iterator of (``start_pfn``, ``end_pfn``, ``mem_map``) tuples.
        ``start_pfn`` is the minimum page frame number (PFN) in the range
        (inclusive). ``end_pfn`` is the maximum PFN in the range (exclusive).
        ``mem_map`` is a ``struct page *`` object such that ``mem_map[pfn]`` is
        the ``struct page`` for the given PFN.
    """
    try:
        mem_section = prog["mem_section"]
    except ObjectNotFoundError:
        yield from _for_each_valid_page_range_flatmem(prog)
        return

    # To support SPARSEMEM without SPARSEMEM_VMEMMAP, we will need to check
    # whether each section's mem_map is contiguous.
    mem_map = prog["vmemmap"].read_()

    PAGE_SHIFT = prog["PAGE_SHIFT"].value_()
    SECTIONS_PER_ROOT = prog["SECTIONS_PER_ROOT"].value_()
    SECTION_SIZE_BITS = prog["SECTION_SIZE_BITS"].value_()
    PAGES_PER_SECTION = 1 << (SECTION_SIZE_BITS - PAGE_SHIFT)
    SUBSECTION_SHIFT = 21
    SUBSECTIONS_PER_SECTION = 1 << (SECTION_SIZE_BITS - SUBSECTION_SHIFT)
    PAGES_PER_SUBSECTION = 1 << (SUBSECTION_SHIFT - PAGE_SHIFT)
    flags = _section_flags(prog)
    SECTION_HAS_MEM_MAP = flags["SECTION_HAS_MEM_MAP"]
    SECTION_IS_EARLY = flags["SECTION_IS_EARLY"]

    highest_present_section_nr = _highest_present_section_nr(prog)
    nr_roots = highest_present_section_nr // SECTIONS_PER_ROOT + 1

    unaliased_type = mem_section.type_.unaliased()
    if unaliased_type.kind == TypeKind.POINTER:
        mem_section = mem_section.read_()
        if not mem_section:
            return

    root_kind = unaliased_type.type.unaliased_kind()

    pfn = 0
    start_pfn = None
    for root_nr, root in enumerate(mem_section[:nr_roots]):
        if root_kind == TypeKind.POINTER:
            root = root.read_()
            if not root:
                if start_pfn is not None:
                    yield start_pfn, pfn, mem_map
                    start_pfn = None
                pfn += SECTIONS_PER_ROOT * PAGES_PER_SECTION
                continue

        if root_nr == nr_roots - 1:
            nr_sections = highest_present_section_nr % SECTIONS_PER_ROOT + 1
        else:
            nr_sections = SECTIONS_PER_ROOT
        for section in root[:nr_sections]:
            mem_map_value = section.section_mem_map.value_()
            # Open-coded valid_section() and early_section() to avoid some
            # overhead.
            if mem_map_value & SECTION_HAS_MEM_MAP:
                # struct mem_section::usage only exists since Linux kernel
                # commit f1eca35a0dc7 ("mm/sparsemem: introduce struct
                # mem_section_usage") (in v5.3). Additionally, struct
                # mem_section_usage::subsection_map only exists for
                # CONFIG_SPARSEMEM_VMEMMAP. Without both, as well as for early
                # sections, validity has section granularity.
                subsection_map = None
                if not (mem_map_value & SECTION_IS_EARLY):
                    try:
                        subsection_map = section.usage.subsection_map
                    except AttributeError:
                        pass

                if subsection_map is None:
                    if start_pfn is None:
                        start_pfn = pfn
                else:
                    end_bit = None if start_pfn is None else 0
                    for bit in for_each_set_bit(
                        subsection_map, SUBSECTIONS_PER_SECTION
                    ):
                        if bit != end_bit:
                            if start_pfn is not None:
                                yield start_pfn, pfn + end_bit * PAGES_PER_SUBSECTION, mem_map
                            start_pfn = pfn + bit * PAGES_PER_SUBSECTION
                        end_bit = bit + 1
                    if end_bit != SUBSECTIONS_PER_SECTION and start_pfn is not None:
                        yield start_pfn, pfn + end_bit * PAGES_PER_SUBSECTION, mem_map
                        start_pfn = None
            elif start_pfn is not None:
                yield start_pfn, pfn, mem_map
                start_pfn = None
            pfn += PAGES_PER_SECTION

    if start_pfn is not None:
        yield start_pfn, pfn, mem_map


@takes_program_or_default
def PFN_PHYS(prog: Program, pfn: IntegerLike) -> Object:
    """
    Get the physical address of a page frame number (PFN).

    :param pfn: ``unsigned long``
    :return: ``phys_addr_t``
    """
    return Object(prog, "phys_addr_t", pfn) << prog["PAGE_SHIFT"]


@takes_program_or_default
def PHYS_PFN(prog: Program, addr: IntegerLike) -> Object:
    """
    Get the page frame number (PFN) of a physical address.

    :param addr: ``phys_addr_t``
    :return: ``unsigned long``
    """
    return Object(prog, "unsigned long", addr) >> prog["PAGE_SHIFT"]


def page_to_pfn(page: Object) -> Object:
    """
    Get the page frame number (PFN) of a page.

    :param page: ``struct page *``
    :return: ``unsigned long``
    """
    return cast("unsigned long", page - _page0(page.prog_))


def page_to_phys(page: Object) -> Object:
    """
    Get the physical address of a page.

    :param page: ``struct page *``
    :return: ``phys_addr_t``
    """
    return PFN_PHYS(page_to_pfn(page))


def page_to_virt(page: Object) -> Object:
    """
    Get the directly mapped virtual address of a page.

    :param page: ``struct page *``
    :return: ``void *``
    """
    return pfn_to_virt(page_to_pfn(page))


@takes_program_or_default
def pfn_to_page(prog: Program, pfn: IntegerLike) -> Object:
    """
    Get the page with a page frame number (PFN).

    :param pfn: ``unsigned long``
    :return: ``struct page *``
    """
    return _page0(prog) + pfn


@takes_program_or_default
def pfn_to_virt(prog: Program, pfn: IntegerLike) -> Object:
    """
    Get the directly mapped virtual address of a page frame number (PFN).

    :param pfn: ``unsigned long``
    :return: ``void *``
    """
    return phys_to_virt(PFN_PHYS(prog, pfn))


@takes_program_or_default
def phys_to_page(prog: Program, addr: IntegerLike) -> Object:
    """
    Get the page containing a physical address.

    :param addr: ``phys_addr_t``
    :return: ``struct page *``
    """
    return pfn_to_page(PHYS_PFN(prog, addr))


@takes_program_or_default
def phys_to_virt(prog: Program, addr: IntegerLike) -> Object:
    """
    Get the directly mapped virtual address of a physical address.

    :param addr: ``phys_addr_t``
    :return: ``void *``
    """
    return Object(
        prog, "void *", operator.index(addr) + _linux_helper_direct_mapping_offset(prog)
    )


@takes_program_or_default
def virt_to_page(prog: Program, addr: IntegerLike) -> Object:
    """
    Get the page containing a directly mapped virtual address.

    .. _mm-helpers-direct-map:

    .. note::

        This only works for virtual addresses from the "direct map". This
        includes address from:

        * kmalloc
        * Slab allocator
        * Page allocator

        But not:

        * vmalloc
        * vmap
        * ioremap
        * Symbols (function pointers, global variables)

        For vmalloc or vmap addresses, use :func:`vmalloc_to_page(addr)
        <vmalloc_to_page>`. For arbitrary kernel addresses, use
        :func:`follow_page(prog["init_mm"].address_of_(), addr) <follow_page>`.

    :param addr: ``void *``
    :return: ``struct page *``
    """
    return pfn_to_page(virt_to_pfn(prog, addr))


@takes_program_or_default
def virt_to_pfn(prog: Program, addr: IntegerLike) -> Object:
    """
    Get the page frame number (PFN) of a directly mapped virtual address.

    .. note::

        This only works for virtual addresses from the :ref:`"direct map"
        <mm-helpers-direct-map>`. For vmalloc or vmap addresses, use
        :func:`vmalloc_to_pfn(addr) <vmalloc_to_pfn>`. For arbitrary kernel
        addresses, use :func:`follow_pfn(prog["init_mm"].address_of_(), addr)
        <follow_pfn>`.

    :param addr: ``void *``
    :return: ``unsigned long``
    """
    return PHYS_PFN(virt_to_phys(prog, addr))


@takes_program_or_default
def virt_to_phys(prog: Program, addr: IntegerLike) -> Object:
    """
    Get the physical address of a directly mapped virtual address.

    .. note::

        This only works for virtual addresses from the :ref:`"direct map"
        <mm-helpers-direct-map>`. For arbitrary kernel addresses, use
        :func:`follow_phys(prog["init_mm"].address_of_(), addr) <follow_phys>`.

    :param addr: ``void *``
    :return: ``phys_addr_t``
    """
    return Object(
        prog,
        "unsigned long",
        operator.index(addr) - _linux_helper_direct_mapping_offset(prog),
    )


def follow_page(mm: Object, addr: IntegerLike) -> Object:
    """
    Get the page that a virtual address maps to in a virtual address space.

    >>> task = find_task(113)
    >>> follow_page(task.mm, 0x7fffbbb6d4d0)
    *(struct page *)0xffffbe4bc0337b80 = {
        ...
    }

    :param mm: ``struct mm_struct *``
    :param addr: ``void *``
    :return: ``struct page *``
    :raises FaultError: if the virtual address
        :ref:`cannot be translated <virtual address translation failures>`
    :raises NotImplementedError: if virtual address translation is :ref:`not
        supported <architecture support matrix>` for this architecture yet
    """
    return phys_to_page(follow_phys(mm, addr))


def follow_pfn(mm: Object, addr: IntegerLike) -> Object:
    """
    Get the page frame number (PFN) that a virtual address maps to in a virtual
    address space.

    >>> task = find_task(113)
    >>> follow_pfn(task.mm, 0x7fffbbb6d4d0)
    (unsigned long)52718

    :param mm: ``struct mm_struct *``
    :param addr: ``void *``
    :return: ``unsigned long``
    :raises FaultError: if the virtual address
        :ref:`cannot be translated <virtual address translation failures>`
    :raises NotImplementedError: if virtual address translation is :ref:`not
        supported <architecture support matrix>` for this architecture yet
    """
    return PHYS_PFN(follow_phys(mm, addr))


def follow_phys(mm: Object, addr: IntegerLike) -> Object:
    """
    Get the physical address that a virtual address maps to in a virtual
    address space.

    >>> task = find_task(113)
    >>> follow_phys(task.mm, 0x7fffbbb6d4d0)
    (phys_addr_t)215934160

    :param mm: ``struct mm_struct *``
    :param addr: ``void *``
    :return: ``phys_addr_t``
    :raises FaultError: if the virtual address
        :ref:`cannot be translated <virtual address translation failures>`
    :raises NotImplementedError: if virtual address translation is :ref:`not
        supported <architecture support matrix>` for this architecture yet
    """
    prog = mm.prog_
    return Object(prog, "phys_addr_t", _linux_helper_follow_phys(prog, mm.pgd, addr))


@takes_program_or_default
def vmalloc_to_page(prog: Program, addr: IntegerLike) -> Object:
    """
    Get the page containing a vmalloc or vmap address.

    >>> task = find_task(113)
    >>> vmalloc_to_page(task.stack)
    *(struct page *)0xffffbe4bc00a2200 = {
        ...
    }

    :param addr: ``void *``
    :return: ``struct page *``
    """
    return follow_page(prog["init_mm"].address_of_(), addr)


@takes_program_or_default
def vmalloc_to_pfn(prog: Program, addr: IntegerLike) -> Object:
    """
    Get the page frame number (PFN) containing a vmalloc or vmap address.

    >>> task = find_task(113)
    >>> vmalloc_to_pfn(task.stack)
    (unsigned long)10376

    :param addr: ``void *``
    :return: ``unsigned long``
    """
    return page_to_pfn(vmalloc_to_page(prog, addr))


def _vmap_area_rb_cmp(addr: int, va: Object) -> int:
    if addr < va.va_start.value_():
        return -1
    elif addr >= va.va_end.value_():
        return 1
    else:
        return 0


def _vmap_nodes(prog: Program) -> Object:
    vmap_nodes = prog["vmap_nodes"]
    try:
        return vmap_nodes.read_()
    except ObjectAbsentError:
        # On !SMP and 32-bit kernels, vmap_nodes is initialized to &single and
        # never reassigned. GCC as of version 12.2 doesn't generate a location
        # for vmap_nodes description in that case.
        return prog.variable("single", "mm/vmalloc.c").address_of_()


@takes_program_or_default
def find_vmap_area(prog: Program, addr: IntegerLike) -> Object:
    """
    Return the ``struct vmap_area *`` containing an address.

    >>> find_vmap_area(0xffffa2b680081000)
    *(struct vmap_area *)0xffffa16541046b40 = {
            ...
    }

    :param addr: Address to look up.
    :return: ``struct vmap_area *`` (``NULL`` if not found)
    """
    addr = operator.index(addr)
    # Since Linux kernel commit d093602919ad ("mm: vmalloc: remove global
    # vmap_area_root rb-tree") (in v6.9), vmap areas are split up in multiple
    # red-black trees in separate "nodes". Before that, they're in a single
    # red-black tree.
    try:
        vmap_nodes = _vmap_nodes(prog)
    except KeyError:
        return rb_find(
            "struct vmap_area",
            prog["vmap_area_root"].address_of_(),
            "rb_node",
            addr,
            _vmap_area_rb_cmp,
        )
    else:
        nr_vmap_nodes = prog["nr_vmap_nodes"].value_()
        i = j = (addr // prog["vmap_zone_size"].value_()) % nr_vmap_nodes
        while True:
            vn = vmap_nodes[i]
            va = rb_find(
                "struct vmap_area",
                vn.busy.root.address_of_(),
                "rb_node",
                addr,
                _vmap_area_rb_cmp,
            )
            if va:
                return va
            # As noted in the kernel implementation, the given address may be
            # in a different node than the start address, so we have to loop.
            i = (i + 1) % nr_vmap_nodes
            if i == j:
                return NULL(prog, "struct vmap_area *")


@takes_program_or_default
def for_each_vmap_area(prog: Program) -> Iterator[Object]:
    """
    Iterate over every ``struct vmap_area *`` on the system.

    >>> for va in for_each_vmap_area():
    ...     caller = ""
    ...     if va.vm:
    ...         try:
    ...             sym = prog.symbol(va.vm.caller)
    ...         except LookupError:
    ...             pass
    ...         else:
    ...             caller = f" {sym.name}"
    ...     print(f"{hex(va.va_start)}-{hex(va.va_end)}{caller}")
    ...
    0xffffa2b680000000-0xffffa2b680005000 irq_init_percpu_irqstack
    0xffffa2b680005000-0xffffa2b680007000 acpi_os_map_iomem
    0xffffa2b68000b000-0xffffa2b68000d000 hpet_enable
    0xffffa2b680080000-0xffffa2b680085000 kernel_clone
    ...

    :return: Iterator of ``struct vmap_area *`` objects.
    """
    # Since Linux kernel commit d093602919ad ("mm: vmalloc: remove global
    # vmap_area_root rb-tree") (in v6.9), vmap areas are split up in multiple
    # lists in separate "nodes". Before that, they're in a single list.
    try:
        vmap_nodes = _vmap_nodes(prog)
    except KeyError:
        yield from list_for_each_entry(
            "struct vmap_area", prog["vmap_area_list"].address_of_(), "list"
        )
    else:
        for i in range(prog["nr_vmap_nodes"]):
            yield from list_for_each_entry(
                "struct vmap_area", vmap_nodes[i].busy.head.address_of_(), "list"
            )


def access_process_vm(task: Object, address: IntegerLike, size: IntegerLike) -> bytes:
    """
    Read memory from a task's virtual address space.

    >>> task = find_task(1490152)
    >>> access_process_vm(task, 0x7f8a62b56da0, 12)
    b'hello, world'

    :param task: ``struct task_struct *``
    :param address: Starting address.
    :param size: Number of bytes to read.
    :raises FaultError: if the virtual address
        :ref:`cannot be translated <virtual address translation failures>`
    :raises NotImplementedError: if virtual address translation is :ref:`not
        supported <architecture support matrix>` for this architecture yet
    """
    return _linux_helper_read_vm(task.prog_, task.mm.pgd, address, size)


def access_remote_vm(mm: Object, address: IntegerLike, size: IntegerLike) -> bytes:
    """
    Read memory from a virtual address space. This is similar to
    :func:`access_process_vm()`, but it takes a ``struct mm_struct *`` instead
    of a ``struct task_struct *``.

    >>> task = find_task(1490152)
    >>> access_remote_vm(task.mm, 0x7f8a62b56da0, 12)
    b'hello, world'

    :param mm: ``struct mm_struct *``
    :param address: Starting address.
    :param size: Number of bytes to read.
    :raises FaultError: if the virtual address
        :ref:`cannot be translated <virtual address translation failures>`
    :raises NotImplementedError: if virtual address translation is :ref:`not
        supported <architecture support matrix>` for this architecture yet
    """
    return _linux_helper_read_vm(mm.prog_, mm.pgd, address, size)


def cmdline(task: Object) -> Optional[List[bytes]]:
    """
    Get the list of command line arguments of a task, or ``None`` for kernel tasks.

    >>> cmdline(find_task(1495216))
    [b'vim', b'drgn/helpers/linux/mm.py']

    .. code-block:: console

        $ tr '\\0' ' ' < /proc/1495216/cmdline
        vim drgn/helpers/linux/mm.py

    :param task: ``struct task_struct *``
    :raises FaultError: if the virtual address containing the command line
        :ref:`cannot be translated <virtual address translation failures>`
    :raises NotImplementedError: if virtual address translation is :ref:`not
        supported <architecture support matrix>` for this architecture yet
    """
    mm = task.mm.read_()
    if not mm:
        return None
    arg_start = mm.arg_start.value_()
    arg_end = mm.arg_end.value_()
    return access_remote_vm(mm, arg_start, arg_end - arg_start).split(b"\0")[:-1]


def environ(task: Object) -> Optional[List[bytes]]:
    """
    Get the list of environment variables of a task, or ``None`` for kernel tasks.

    >>> environ(find_task(1497797))
    [b'HOME=/root', b'PATH=/usr/local/sbin:/usr/local/bin:/usr/bin', b'LOGNAME=root']

    .. code-block:: console

        $ tr '\\0' '\\n' < /proc/1497797/environ
        HOME=/root
        PATH=/usr/local/sbin:/usr/local/bin:/usr/bin
        LOGNAME=root

    :param task: ``struct task_struct *``
    :raises FaultError: if the virtual address containing the environment
        :ref:`cannot be translated <virtual address translation failures>`
    :raises NotImplementedError: if virtual address translation is :ref:`not
        supported <architecture support matrix>` for this architecture yet
    """
    mm = task.mm.read_()
    if not mm:
        return None
    env_start = mm.env_start.value_()
    env_end = mm.env_end.value_()
    return access_remote_vm(mm, env_start, env_end - env_start).split(b"\0")[:-1]


def _vma_rb_cmp(addr: int, vma: Object) -> int:
    if addr < vma.vm_start.value_():
        return -1
    elif addr >= vma.vm_end.value_():
        return 1
    else:
        return 0


def vma_find(mm: Object, addr: IntegerLike) -> Object:
    """
    Return the virtual memory area (VMA) containing an address.

    :param mm: ``struct mm_struct *``
    :param addr: Address to look up.
    :return: ``struct vm_area_struct *`` (``NULL`` if not found)
    """
    try:
        # Since Linux kernel commit 524e00b36e8c ("mm: remove rb tree.") (in
        # v6.1), VMAs are stored in a maple tree.
        mt = mm.mm_mt.address_of_()
    except AttributeError:
        # Before that, they are in a red-black tree.
        return rb_find(
            "struct vm_area_struct",
            mm.mm_rb.address_of_(),
            "vm_rb",
            operator.index(addr),
            _vma_rb_cmp,
        )
    else:
        return cast("struct vm_area_struct *", mtree_load(mt, addr))


def vma_name(vma: Object) -> str:
    """
    Get the display name or file path for a Virtual Memory Area (VMA).

    This helper returns a human-readable label for a given
    ``struct vm_area_struct *`` object, describing what type of memory region
    it represents. It mimics the VMA naming behavior seen in
    ``/proc/<pid>/maps``.

    :param vma: ``struct vm_area_struct *``
    :return: File path or descriptive name of the VMA region. Possible return
        values include:

        - **File path** (e.g., ``/usr/lib/libc.so.6``):
          For VMAs backed by files (``vma->vm_file`` non-NULL).
        - ``[heap]``:
          Heap segment, between ``mm->start_brk`` and ``mm->brk``.
        - ``[stack]``:
          Stack segment containing ``mm->start_stack``.
        - ``[vdso]``:
          Virtual Dynamic Shared Object area (``mm->context.vdso``).
        - ``[vvar]``, ``[vsyscall]``, etc.:
          Architecture-specific special mappings from ``vm_ops->name``.
        - ``[anon:<name>]``: private anonymous memory with a name set by
          `PR_SET_VMA_ANON_NAME
          <https://man7.org/linux/man-pages/man2/PR_SET_VMA.2const.html>`_.
        - ``[anon_shmem:<name>]``: shared memory with a name set by
          `PR_SET_VMA_ANON_NAME
          <https://man7.org/linux/man-pages/man2/PR_SET_VMA.2const.html>`_.
        - Empty string: anonymous memory.
    """
    prog = vma.prog_
    mm = vma.vm_mm.read_()

    vm_file = vma.vm_file.read_()
    if vm_file:
        # Since Linux kernel commit d09e8ca6cb93 ("mm: anonymous shared memory
        # naming") (in v6.2), vma->anon_name exists and is valid iff
        # CONFIG_ANON_VMA_NAME=y. Between that commit and commit 9a10064f5625
        # ("mm: add a field to store names for private anonymous memory") (in
        # v5.17), it exists in a union with vma->shared, so it is only valid
        # when vma->vm_file is NULL. Before that, it doesn't exist.
        if mm:
            try:
                anon_name = vma.anon_name
            except AttributeError:
                pass
            else:
                if anon_name.address_ != vma.shared.address_:
                    anon_name = anon_name.read_()
                    if anon_name:
                        return f"[anon_shmem:{os.fsdecode(anon_name.name.string_())}]"

        return os.fsdecode(d_path(vm_file.f_path))

    vm_ops = vma.vm_ops.read_()
    if vm_ops:
        vm_ops_name = vm_ops.name.read_()
        if vm_ops_name == prog["special_mapping_name"]:
            return (
                cast("struct vm_special_mapping *", vma.vm_private_data)
                .name.string_()
                .decode()
            )

    if not mm:
        return "[vdso]"

    start = vma.vm_start.value_()

    # Before Linux kernel commits c1bab64360e6 ("powerpc/vdso: Move to
    # _install_special_mapping() and remove arch_vma_name()") and c1bab64360e6
    # ("powerpc/vdso: Move to _install_special_mapping() and remove
    # arch_vma_name()") (in v5.11), PowerPC needs a special case for the vdso.
    if prog.platform.arch == Architecture.PPC64:  # type: ignore[union-attr]  # platform can't be None.
        try:
            vdso_base = mm.context.vdso_base
        except AttributeError:
            pass
        else:
            if start == vdso_base.value_():
                return "[vdso]"

    end = vma.vm_end.value_()

    if (
        mm.start_brk
        and mm.brk
        and start <= mm.brk.value_()
        and end >= mm.start_brk.value_()
    ):
        return "[heap]"

    if mm.start_stack and start <= mm.start_stack.value_() <= end:
        return "[stack]"

    # See above regarding the existence of vma->anon_name.
    try:
        anon_name = vma.anon_name.read_()
    except AttributeError:
        pass
    else:
        if anon_name:
            return f"[anon:{os.fsdecode(anon_name.name.string_())}]"

    return ""


def for_each_vma(mm: Object) -> Iterator[Object]:
    """
    Iterate over every virtual memory area (VMA) in a virtual address space.

    >>> for vma in for_each_vma(task.mm):
    ...     print(vma)
    ...
    *(struct vm_area_struct *)0xffff97ad82bfc930 = {
        ...
    }
    *(struct vm_area_struct *)0xffff97ad82bfc0a8 = {
        ...
    }
    ...

    :param mm: ``struct mm_struct *``
    :return: Iterator of ``struct vm_area_struct *`` objects.
    """
    try:
        # Since Linux kernel commit 763ecb035029 ("mm: remove the vma linked
        # list") (in v6.1), VMAs are stored in a maple tree.
        mt = mm.mm_mt.address_of_()
    except AttributeError:
        # Before that, they are in a linked list.
        vma = mm.mmap
        while vma:
            yield vma
            vma = vma.vm_next
    else:
        type = mm.prog_.type("struct vm_area_struct *")
        for _, _, entry in mt_for_each(mt):
            yield cast(type, entry)


@takes_program_or_default
def totalram_pages(prog: Program) -> int:
    """Return the total number of RAM pages."""
    try:
        # The variable is present since Linux kernel commit ca79b0c211af63fa32
        # ("mm: convert totalram_pages and totalhigh_pages variables
        # to atomic") (in v5.0).
        return prog["_totalram_pages"].counter.value_()
    except KeyError:
        return prog["totalram_pages"].value_()


@takes_program_or_default
def vm_commit_limit(prog: Program) -> int:
    """Get the limit on committed virtual address space in pages."""
    sysctl_overcommit_kbytes = prog["sysctl_overcommit_kbytes"].value_()
    if sysctl_overcommit_kbytes:
        allowed = sysctl_overcommit_kbytes >> (prog["PAGE_SHIFT"].value_() - 10)
    else:
        # Avoid circular import.
        from drgn.helpers.linux.hugetlb import hugetlb_total_pages

        allowed = (
            (totalram_pages(prog) - hugetlb_total_pages(prog))
            * prog["sysctl_overcommit_ratio"].value_()
            // 100
        )
    return allowed + prog["total_swap_pages"].value_()


@takes_program_or_default
def vm_memory_committed(prog: Program) -> int:
    """Get the number of pages of committed virtual address space."""
    return percpu_counter_sum_positive(prog["vm_committed_as"].address_of_())


class PageUsage(NamedTuple):
    """Memory usage statistic in page units."""

    pages: int
    """Total number of pages."""

    free_pages: int
    """Number of free pages."""

    @property
    def used_pages(self) -> int:
        """Number of used pages."""
        return self.pages - self.free_pages


@takes_program_or_default
def in_direct_map(prog: Program, addr: IntegerLike) -> bool:
    """
    Return whether an address is within the kernel's direct memory mapping.

    :param addr: address to check
    """
    addr = operator.index(addr)
    start_addr = pfn_to_virt(prog["min_low_pfn"]).value_()
    end_addr = (pfn_to_virt(prog["max_low_pfn"]) + prog["PAGE_SIZE"]).value_()
    return start_addr <= addr < end_addr


class TaskRss(NamedTuple):
    """Task's resident set size returned by :func:`task_rss()`."""

    file: int
    """Number of resident file pages."""
    anon: int
    """Number of resident anonymous pages."""
    shmem: int
    """Number of resident shared memory pages."""
    swap: int
    """Number of swapped-out anonymous private pages."""

    @property
    def total(self) -> int:
        """
        Total number of resident pages (:attr:`file` + :attr:`anon` +
        :attr:`shmem`).
        """
        return self.file + self.anon + self.shmem


@takes_program_or_default
def task_rss(prog: Program, task: Object) -> TaskRss:
    """
    Return a task's resident set size (RSS) in pages.

    The task's RSS is the number of pages which are currently resident in
    memory. The RSS values can be broken down into anonymous pages (not bound
    to any file), file pages (those associated with memory mapped files), and
    shared memory pages (those which aren't associated with on-disk files, but
    belonging to shared memory mappings). This function returns a named tuple
    containing each category, but the common behavior is to use the
    :attr:`~TaskRss.total` value which sums them up.

    :param task: ``struct task_struct *``
    """

    mm = task.mm.read_()

    # Kthreads have a NULL mm, simply skip them, returning 0.
    if not mm:
        return TaskRss(0, 0, 0, 0)

    prog = task.prog_
    rss_stat = mm.rss_stat

    MM_FILEPAGES = prog.constant("MM_FILEPAGES").value_()
    MM_ANONPAGES = prog.constant("MM_ANONPAGES").value_()
    MM_SWAPENTS = prog.constant("MM_SWAPENTS").value_()

    # This counter was added in Linux kernel commit eca56ff906bd ("mm, shmem:
    # add internal shmem resident memory accounting") (in v4.5).
    try:
        MM_SHMEMPAGES = prog.constant("MM_SHMEMPAGES").value_()
    except LookupError:
        MM_SHMEMPAGES = -1

    if rss_stat.type_.kind == TypeKind.ARRAY:
        # Since Linux kernel commit f1a7941243c10 ("mm: convert mm's rss stats
        # into percpu_counter") (in v6.2), the "rss_stat" object is an array of
        # percpu counters. Simply sum them up!
        filerss = percpu_counter_sum(rss_stat[MM_FILEPAGES].address_of_())
        anonrss = percpu_counter_sum(rss_stat[MM_ANONPAGES].address_of_())
        swapents = percpu_counter_sum(rss_stat[MM_SWAPENTS].address_of_())
        shmemrss = percpu_counter_sum(rss_stat[MM_SHMEMPAGES].address_of_())
    else:
        # Prior to this, the "rss_stat" was a structure containing counters that
        # were cached on each task_struct and periodically updated into the
        # mm_struct. We start with the counter values from the mm_struct and
        # then sum up the cached copies from each thread.
        filerss = rss_stat.count[MM_FILEPAGES].counter.value_()
        anonrss = rss_stat.count[MM_ANONPAGES].counter.value_()
        swapents = rss_stat.count[MM_SWAPENTS].counter.value_()
        shmemrss = 0
        if MM_SHMEMPAGES >= 0:
            shmemrss = rss_stat.count[MM_SHMEMPAGES].counter.value_()

        for gtask in for_each_task_in_group(task, include_self=True):
            # Kernel configurations with a small NR_CPUS don't have the
            # per-thread cache.
            try:
                rss_stat = gtask.rss_stat
            except AttributeError:
                break
            filerss += rss_stat.count[MM_FILEPAGES].value_()
            anonrss += rss_stat.count[MM_ANONPAGES].value_()
            swapents += rss_stat.count[MM_SWAPENTS].value_()
            if MM_SHMEMPAGES >= 0:
                shmemrss += rss_stat.count[MM_SHMEMPAGES].value_()

    return TaskRss(filerss, anonrss, shmemrss, swapents)


@takes_program_or_default
def for_each_memory_block(prog: Program) -> Iterable[Object]:
    """
    Iterate over all memory hotplug blocks.

    :return: Iterator of ``struct memory_block *`` objects.
    """
    memory_block_type = prog.type("struct memory_block")
    for dev in bus_for_each_dev(prog["memory_subsys"].address_of_()):
        yield container_of(dev, memory_block_type, "dev")


# These are macros that haven't changed since they were introduced.
_MEMORY_BLOCK_STATE = {
    # Added in Linux kernel commit 3947be1969a9 ("[PATCH] memory hotplug: sysfs
    # and add/remove functions") (in v2.6.15).
    (1 << 0): "MEM_ONLINE",
    (1 << 1): "MEM_GOING_OFFLINE",
    (1 << 2): "MEM_OFFLINE",
    # Added in Linux kernel commit 7b78d335ac15 ("memory hotplug: rearrange
    # memory hotplug notifier") (in v2.6.24).
    (1 << 3): "MEM_GOING_ONLINE",
    (1 << 4): "MEM_CANCEL_ONLINE",
    (1 << 5): "MEM_CANCEL_OFFLINE",
    # Added in Linux kernel commit c5f1e2d18909 ("mm/memory_hotplug: introduce
    # MEM_PREPARE_ONLINE/MEM_FINISH_OFFLINE notifiers") (in v6.5).
    (1 << 6): "MEM_PREPARE_ONLINE",
    (1 << 7): "MEM_FINISH_OFFLINE",
}


def decode_memory_block_state(mem: Object) -> str:
    """
    Get a human-readable representation of the state of a memory hotplug block.

    >>> decode_memory_block_state(mem)
    'MEM_ONLINE'

    :param mem: ``struct memory_block *``
    """
    return _MEMORY_BLOCK_STATE[mem.state.value_()]


@takes_program_or_default
def memory_block_size_bytes(prog: Program) -> Object:
    """
    Return the size of a memory hotplug block in bytes.

    This is the unit that can be hot(un)plugged.

    :return: ``unsigned long``
    """
    return (
        cast("unsigned long", prog["sections_per_block"]) << prog["SECTION_SIZE_BITS"]
    )
